Using sparse convolutions to generate noisy inputs#

Basic principle#

Note

We are interested here in 1-d signals, but this algorithm generalizes easily to \(n\)-d. The \(n\)-d form is the one found in (Lewis, 1989).

The sparse convolution algorithm (Lewis, 1989) is an effective way to generate a noise signal with a prescribed covariance. Moreover, it produces a so-called “solid noise”: a continuous function which can be evaluated at any \(t\).[1] This algorithm was originally designed to produce computationally efficient, naturalistic noise for computer graphics, but the same features make it uniquely suited for use as an input noise when integrating systems with standard ODE integrators. Even integrators which adapt their step size are supported.

The idea is to generate the desired signal \(ξ\) by convolving a kernel \(h\) with a Poisson noise process \(γ\):

(1)#\[\begin{equation} ξ(t) = h \ast γ = \int h(t-t') γ(t') \,dt' \,. \end{equation}\]

The Poisson noise is defined as

(2)#\[γ(t) = \sum_k a_k \, δ(t - t_k) \,, \]

where both the weights \(a_k\) and the impulse times \(t_k\) are stochastic and uncorrelated. (It particular this means that the distribution of \(a_k\) should satisfy \(\braket{a_k} = 0\).) Because the impulses are sparse, the convolution reduces to an efficient summation:

(3)#\[\begin{equation} ξ(t) = \sum_k a_k h(t - t_k) \,, \end{equation}\]

and the autocorrelation is given by

(4)#\[\begin{equation} \bigl\langle{ξ(t)ξ(t-t')}\bigr\rangle = \sum_k \Braket{a_k^2} h(t-t_k)h(t'-t_k) \,. \end{equation}\]

Remark

Assuming the derivatives of the kernel function are known, derivatives of the generated noise are also easy to evaluate:

\[ξ^{(n)}(t) = \sum_k a_k h^{(n)}(t-t_k)\]

Notation

We use

\[\begin{equation*} \small \tilde{ξ}(ω) = \frac{1}{\sqrt{2π}}\int e^{-iωt} ξ(t)\,dt \end{equation*}\]

to denote the Fourier transform of \(ξ\), and

\[\begin{align*} \small S_{ξξ}(ω) &= \lvert \tilde{ξ}(ω) \rvert^2 \\ &= \frac{1}{\sqrt{2π}}\int\!dt\,e^{-iωt} \braket{ξ(t)ξ(t-τ)}_τ \end{align*}\]

to denote its power spectrum.[2]

Moreover, the power spectrum \(S_{γγ}\) of \(γ\) is flat:

\[\begin{align*} \Bigl\langle{γ(t)γ(t')}\Bigr\rangle &= \bigl\langle{a^2}\bigr\rangle \, δ(t-t') \\ S_{γγ}(ω) &= \frac{1}{\sqrt{2π}} \int \Braket{γ(t)γ(t')} e^{-iωt} \,dt = \frac{ρ\braket{a^2}}{\sqrt{2π}} \end{align*}\]

which is why \(γ\) is referred to as a sparse white noise.

In particular this means that the power spectrum of the autocorrelation \(\braket{ξ(t)ξ(t')}\) is proportional to that of \(h\):[3][4]

\[\begin{align*} S_{ξξ}(ω) &= S_{γγ}(ω) S_{hh}(ω) \\ &= \frac{ρ\braket{a^2}}{\sqrt{2π}} S_{hh}(ω) \\ &= \frac{ρ\braket{a^2}}{\sqrt{2π}} \lvert \tilde{h}(ω) \rvert^2 \,. \end{align*}\]

To get the correlation function, we can use the Wiener-Khinchine theorem: applying the inverse Fourier transform to \(S_{ξξ}\) will yield the autocorrelation:

(5)#\[\begin{equation} C_{ξξ}(t) = \Bigl\langle{ξ(t)ξ(t+s)\Bigr\rangle} = \frac{ρ\braket{a^2}}{\sqrt{2π}} \mathcal{F}^{-1}\bigl\{\,\lvert\tilde{h}\rvert^2 \,\bigr\} \,. \end{equation}\]

In general this is not possible to do analytically, but for particular choices of \(h\) it is. Perhaps the most obvious one is a Gaussian kernel; then \(h(s)\), \(\tilde{h}(ω)\), \(|\tilde{h}(ω)|\) and \(C_{ξξ}(s)\) are all Gaussian. Another one would be a boxcar kernel. In general, if we have a function \(\tilde{h}(ω)\) for which the Fourier transforms of both \(\tilde{h}(ω)\) and \(|\tilde{h}(ω)|^2\) are known, then it is possible to select the kernel given the desired autocorrelation length.

Summary

  • To generate a noise \(ξ\) with an given autocorrelation spectrum, we need to choose a kernel \(h\) with the same autocorrelation spectrum (up to some constant).

  • This constant is proportional to the variance of the weights \(a\).

Comparison with other methods#

Sparse convolution
  • ✔ Can use a standard ODE integrator.

  • ✔ Dense output: Can be evaluated at arbitrary \(t\).

  • ✔ Technically simple.

  • ✔ Large amount of control over the autocorrelation function.

  • ✘ Autocorrelation function is controlled via its spectrum. In general closed form expressions relating a kernel \(h\) to a autocorrelation \(C_{ξξ}\) do not exist.

  • ✔ Online algorithm: can easily generate new points.

  • ✔ Fast: Number of operations per time point is in the order of \(\mathcal{O}(5mρ)\), where \(m\) is the number of neighbouring bins we sum over and \(ρ\) is the number of impulses per bin. In particular, this is independent of the total simulation time.

Integrating Wiener noise
  • ✘ Requires a stochastic (SDE) integrator.

  • ✘ Sparse trace: trace is composed of discrete time steps. Generating a new intermediate point requires re-integrating everything after that point.[6]

  • ✔ Borderline trivial, if one is familiar with stochastic integrals. Otherwise conceptually and technically difficult.

  • ✔ The autocorrelation is known: it is determined by the number of surrogate variables added for the integrals.

  • ✘ Limited control over the correlation function: limited to those obtainable by adding a reasonable number of surrogate variables.

  • ✔ Online algorithm: can easily generate new points.

  • ✔ Very fast: New points are generated by drawing a Gaussian and performing a few sums.
    However, the need to use a low-order stochastic integrator may inflate computation time.

Generating random spectra
  • ✔ Can use a standard ODE integrator (I think).

  • ✔ Dense output possible: If we represent as a sum of sine waves instead of the FFT vector, the function can be evaluated at arbitrary \(t\). However the spectrum is only valid above some time resolution \(Δt\).

  • ✔ Full, direct control over the autocorrelation shape.

  • ✘ Technically difficult: It is difficult to get the FFTs to scale exactly right, and they introduce many numerical artifacts.

  • ✘ Scales poorly with the length of simulated time (because of the required FFT).

  • ✘ Fixed simulation time: cannot generate new points to extend the trace.

  • ✔ Moderately fast during evaluation: Number of operations scales with the number of sine components (and therefore both the resolution \(Δt\) and the total simulation time).

Perlin noise
  • ✘ Intended for computer graphics: designed for numerical efficiency, not to have well-characterized statistics (beyond “they look nice”).

  • Similarly, existing libraries which implement Perlin noise are intended for computer graphics, and I would not consider them suitable for scientific applications since the statistics are difficult to characterize.

  • I didn’t consider this further: one would need to write their own “scientific-grade” noise model, in which case they might as well use a simpler algorithm like the sparse convolutions.

Implementation#

from collections.abc import Callable
from dataclasses import dataclass
from typing import Union

import copy
import math
import numpy as np
import numpy.random as random
from numpy.typing import ArrayLike
try:
    import jax.numpy as jnp
    import jax.lax as lax
except ImportError:
    jnp = np
    class lax:
        @staticmethod
        def dynamic_slice(operand, start_indices, slice_sizes):
            slcs = tuple(slice(i,i+l) for i, l in zip(start_indices, slice_sizes))
            return operand[slcs]
from scityping.pint import Quantity
Hide code cell content
# Optional serialization dependencies with fallbacks
# (If these are not available, the noise objects still work, they just
# can’t be serialized.)
try:
    import pint
except ImportError:
    class pint:
        class Quantity:
            pass
try:
    from scityping import Serializable
    from scityping.numpy import Array, RNGenerator
    from scityping.pint import Quantity
    from scityping.pydantic import BaseModel
except ImportError:
    Serializable = object
    BaseModel = object
    class Quantity:
        pass

__all__ = ["ColoredNoise"]

One small complication with the definition in Eq. (2) for the Poisson noise is that for a given \(t\), we don’t know which \(t_k\) are near enough to contribute. And for a long time trace, we definitely don’t want to sum tens of thousands of impulses at every \(t\), when only a few dozen contribute. So following Lewis (1989), we split the time range into bins and generate the same number of impulses within each bin. Then for any \(t\) we can compute with simple arithmetic the bin which contains it, and sum only over neighbouring bins.

ColoredNoise#

The default ColoredNoise class produces noise with a Gaussian kernel, Gaussian weights \(a\), and a Gaussian autocorrelation:

Parameters
  • \(τ\): Correlation time. Equivalent to the standard deviation of the autocorrelation function.

  • \(σ\): Overall noise strength; more specifically its standard deviation: \(\braket{ξ(t)^2} = σ^2\).

  • \(ρ\): Impulse density, in units of \(\frac{\text{# impulses}}{τ}\). For numerical reasons this must be an integer.
    This affects what Lewis calls the “quality” of the noise; a too low number will show artifacts (high peaks), which only disappear after averaging many bins or realizations. Note that increasing \(ρ\) does not cause the noise to wash out, but rather the effect saturates: Lewis reports that generated noises stop being distinguishable when \(ρ\) is greater than 10. In my own tests I observe something similar, even though my implementation is quite different. Part of the reason for this is that we scale the variance of the \(a_k\) to keep the overall variance of \(ξ\) constant.

TODO

Add expression for \(S_{ξξ}\).

Autocorrelation

\(\displaystyle C_{ξξ}(s) = σ^2 e^{s^2/2τ^2}\)

Accessible as .autocorr

Kernel

\(\displaystyle h(s) = e^{-s^2/τ^2}\)

Accessible as .h

Weight distribution

\(\displaystyle a_k \sim \mathcal{N}\left(0, σ \sqrt{\frac{1}{ρ}\sqrt{\frac{2}{π}}} \right)\)

Pre-computed within __init__

Binning

Bin size: \(τ\)

Summation: 5 bins left and right of \(t\) (so a total of 11 bins)

Note

“non-factored” refers to an previous implementation where all operations in __call__ were performed with united values, instead of factoring out the units as _t_unit and _ξ_unit attributes and only adding them at the end. At the time I didn’t run the test with \(ρ=1000\), which is why it is missing from the table.

Table 1 Anecdotal timings (\(t\) scalar, \(ξ\) scalar)#

\(ρ=10\)

\(ρ=30\)

\(ρ=100\)

\(ρ=400\)

\(ρ=1000\)

Intel Xeon W-2265, 3.50GHz
No units

9.2 ± 0.06 μs

10.2 ± 0.08 μs

12.3 ± 0.07 μs

19.0 ± 0.11 μs

32.1 ± 0.45 μs

Intel Xeon W-2265, 3.50GHz
With pint units, non-factored

270 ± 2.7 μs

273 ± 1.6 μs

290 ± 3.0 μs

311 ± 4.3 μs

class ColoredNoise(Serializable):
    """
    Simplified solid noise with some predefined choices: exponential kernel :math:`h`
    and Gaussian weights :math:`a`. The specification parameters are the overall variance σ²,
    the correlation time τ and the number of impulses in a time interval of length τ.
    
    Note
    ----
    This class supports specifying values with units specified using the `pint` library.
    This is an excellent way of avoiding some common mistakes. When doing so, all units
    must be consistent; in particular, ``t/τ`` must be dimensionless in order for the
    exponential to be valid.
    Using units does add a bit of overhead, so for performance-critical code omitting
    them may be preferred.
    """

    # Inspection attributes (not uses when generating points)
    : float                                # Stored without units
    : float                                # Stored without units
    ρ: float
    _bin_edges: np.ndarray[float]
    _t_max: float                            # Stored without units
    # rng: np.random.Generator
    _rng_state: "np.random.Generator->state"  # Restore with `recreate_rng`
    _t_unit: Union[int, pint.Quantity]
    _ξ_unit: Union[int, pint.Quantity]
    # Attributes used when generating noise
    _t_min: float                            # Stored without units
    : float                                # Stored without units
    _t_arr: np.ndarray[float]                # Stored without units
    _a_arr: np.ndarray[float]                # Stored without units
    
    
    def __init__(self,
                 t_min    : Union[float,pint.Quantity],
                 t_max    : Union[float,pint.Quantity],
                 corr_time: Union[float,pint.Quantity],
                 scale: ArrayLike, 
                 impulse_density: int,
                 rng: Union[int,random.Generator,random.SeedSequence,None]=None):
        """
        Multivariate outputs can be generated by passing an array of appropriate shape
        as the `scale` parameter. Each component is uncorrelated with the others, so
        this is equivalent (but faster) to generating N independent noise sources.
        
        Parameters
        ----------
        t_min, t_max: Determine the window of time within which to generate noise.
           The noise function can still be called outside this window, but decays
           to zero the further we are from the window.
           (Specifically, `t_min` and `t_max` determine the window of the underlying
           Poisson noise process γ. We take care to extend the window sufficiently
           so that the noise quality is the same from `t_min` to `t_max`.)
        corr_time: Correlation time (τ). If the kernel is given by :math:`e^{-λ|τ|}`,
           this is :math:`τ`.
        scale: (σ) Standard deviation of the generated noise.
           The shape of `scale` determines the shape of the output.
        impulse_density: (ρ) The expected number of impulses within one correlation
           time of a test time.
        rng: Either a random seed, or an already instantiated NumPy random `Generator`.

        .. Note::
           To specify units, use `pint.Quantities` objects for the parameters
           `t_min`, `t_max`, `corr_time` and/or `scale`.

           - `t_min`, `t_max` and `corr_time` determine the units of the input.
             They must have matching units ('ms' and 's' is fine, but not 'ms' and 'km').
           - `scale` determines the units of the output.
        """
        ### Convention: '_' prefix means without units
        
        rng = random.default_rng(rng)
        if not isinstance(rng, np.random.Generator):
            raise TypeError("Please use a `numpy.random.Generator` for the RNG, "
                            f"or just specify a seed.\nReceived {type(rng)}.")
        # Store the RNG state so the noise can be serialized & recreated
        self._rng_state = rng.bit_generator.state
        
        self._t_unit = getattr(corr_time, "units", 1);
        self._ξ_unit = getattr(scale, "units", 1);
        
        # Compute the std dev for the weights
         = jnp.asarray(getattr(scale, "magnitude", scale))
        τ, ρ = corr_time, impulse_density
        _a_std =  * jnp.sqrt(1/ρ) * (2/np.pi)**(0.25)
        
        # Ensure τ is in the same units as t_arr
        if hasattr(τ, "units"):
            if not hasattr(t_min, "units") or not hasattr(t_max, "units"):
                raise ValueError("If `corr_time` is specified with units, then also `t_min` and `t_max` must have units.")
            t_min = t_min.to(τ.units)
            t_max = t_max.to(τ.units)
            
        # Define unitless vars
        _t_min = getattr(t_min, "magnitude", t_min)
        _t_max = getattr(t_max, "magnitude", t_max)
         = getattr(τ, "magnitude", τ)
        
        # Discretize time range into bins of length τ
        # Following Lewis, we draw the impulses in bins.
        # Each bin has exactly the same number of impulses, allowing us in __call__ to
        # efficiently restrict ourselves to nearby points.
        # NB: We always take 5 bins to the left and right, so we also pad that many bins.
        Nbins = math.ceil((_t_max - _t_min) / ) + 10
        _self_t_min = _t_min - 5*
        #_self_t_max = self_t_min + (Nbins+1)*_τ
            
        # Draw the impulse times
        # First axis: bins
        # Second axis: impulses in that bin
        # All subsequent axes: shape of σ (and of output)
        _bin_edges = np.arange(Nbins+1) *  + _self_t_min  # We use all dimensionless variables here
        a_axes = (np.newaxis,)*.ndim
        impulse_data_shape = (Nbins, impulse_density, *.shape)
        _t_arr = rng.uniform(_bin_edges[(slice(None,-1),np.newaxis,*a_axes)], _bin_edges[(slice(1,None),np.newaxis,*a_axes)],
                             size=impulse_data_shape
                            )
        
        # Draw the weights
        _a_arr = rng.normal(0, _a_std, impulse_data_shape)
        
        # Store attributes
        self. = 
        assert self..shape == np.shape(_a_std)
        self. = 
        self.ρ = ρ
        self. = 1/self.  # With floating point numbers, multiplication is faster than division
        # self.rng = rng
        self._bin_edges = _bin_edges
        self._t_min = _t_min
        self._t_max = _t_max
        self._t_arr = _t_arr
        self._a_arr = _a_arr
        
        # Precomputed window sizes
        pad=5
        self._bin_t_size = (2*pad+1, _t_arr.shape[1])
        self._bin_data_start = (0,)*.ndim
        
    def __call__(self, t, *, pad=5, exp=jnp.exp, int32=jnp.int32):
        # Compute the index of the bin containing t
        if hasattr(t, "units"):
            t = t.to(self._t_unit).magnitude
        i = (t-self._t_min) * self.; i = getattr(i, "magnitude", i)  # Suppress Pint warning that int32 removes dim
        i = int32(i) + pad
        # Compute the noise at t
        #tk = self.t_arr[i-pad:i+pad+1]
        #a = self.a_arr[i-pad:i+pad+1]
        size = self._bin_t_size + self..shape
        tk = lax.dynamic_slice(self._t_arr, (i-pad, int32(0)) + (0,)*self.σ.ndim, size)
        a  = lax.dynamic_slice(self._a_arr, (i-pad, int32(0)) + (0,)*self.σ.ndim, size)
        h = exp(-((t-tk)*self.)**2)  # Inlined from self.h
        return (a * h).sum(axis=(0,1)) * self._ξ_unit   # The first two dimensions are time. Extra dimensions are data dimensions, which we want to keep
    
    def new(self, rng=None, **kwargs):
        """Create a new model, using the values of this one as defaults.
        
        NB: Unless specified, this uses the RNG returned by `np.random.default_rng()`.
            This is independent of whether an RNG was given when creating the original
            noise object. Use the `rng` keyword to control the state of the RNG, or
            ensure that the noise is reproducible.
        """
        rng = np.random.default_rng(rng)
        defaults = dict(t_min=self.t_min, t_max=self.t_max, corr_time=self.τ,
                        scale=self.σ,
                        impulse_density=self.ρ, rng=rng)
        return ColoredNoise(**(defaults | kwargs))
    
    def h(self, s):
        return np.exp(-(s*self.λ)**2)
    
    def autocorr(self, s):
        """Evaluate the theoretical autocorrelation at lag s."""
        if not hasattr(s, "units"):
            s = s * self._t_unit
        return self.σ**2 * np.exp(-0.5*(s*self.λ)**2)
    
    @property
    def impulse_density(self):
        return self.ρ
    @property
    def t_arr(self):
        return self._t_arr * self._t_unit
    @property
    def t_min(self):
        return self._t_min * self._t_unit
    @property
    def t_max(self):
        return self._t_max * self._t_unit
    @property
    def a_arr(self):
        return self._a_arr * self._ξ_unit
    @property
    def τ(self):
        return self. * self._t_unit
    @property
    def corr_time(self):
        return self.τ
    @property
    def λ(self):
        return 1 / self.τ
    @property
    def σ(self):
        return self. * self._ξ_unit
    @property
    def scale(self):
        return self.σ
    @property
    def bin_edges(self):
        return self._bin_edges * self._t_unit
    
    @property
    def T(self):
        return self.t_max - self.t_min
    
    @property
    def Nleftpadbins(self):
        return 5
    @property
    def Nrightpadbins(self):
        return 5
    @property
    def Nbins(self):
        """Return the number of bins, excluding those included for padding."""
        return len(self.t_arr) - self.Nleftpadbins - self.Nrightpadbins

    def qty_to_mag(self, time=None, scale=None) -> "ColoredNoise":
        """Convert dimensioned quantities to pure magnitudes.

        Useful to provide plain arrays to low-level functions, while still
        using units for high-level functions.

        The `time` and `scale` are used to specify into which units to convert
        before retrieving the magnitude. For example, `scale` may have been
        given meters, but the low-level function expects millimeters.
        If these are not specified, then the current units are used.

        If quantities are already without units, then they are silently ignored.
        This is to allow functions to use `qty_to_mag` as a normalizer, when they
        don’t know if their argument is dimensioned.
        """
        # Define `t_unit` and `ξ_unit` as dimensionless scaling factors
        if isinstance(self._t_unit, (pint.Unit, pint.Quantity)):
            if time is None:
                t_unit = 1
            else:
                t_unit = (1*self._t_unit).to(time).m
        else:
            t_unit = 1
        if isinstance(self._ξ_unit, (pint.Unit, pint.Quantity)):
            if scale is None:
                ξ_unit = 1
            else:
                ξ_unit = (1*self._ξ_unit).to(scale).m
        else:
            ξ_unit = 1
        # Use the underlying dimensionless attributes, and multiply by scaling factors
        return ColoredNoise(t_min    =self._t_min*t_unit,
                            t_max    =self._t_max*t_unit,
                            corr_time=self.    *t_unit,
                            scale    =self.    *ξ_unit,
                            impulse_density=self.impulse_density,
                            rng      =recreate_rng(self._rng_state)
        )

    # __eq__ and __hash__are mostly implement to support caching
    # with functools.lru_cache.

    def __eq__(self, other):
        return (type(self) == type(other)
                and self.t_min == other.t_min
                and self.t_max == other.t_max
                and self.corr_time == other.corr_time
                and self.scale == other.scale
                and self.impulse_density == other.impulse_density
                and self._rng_state == other._rng_state)
    def __hash__(self):
        # NB: self._σ may be a JAX variable => we need to convert to python scalar to make it hashable
        return hash((self._t_min, self._t_max, self., str(self._t_unit),
                     float(self.), str(self._ξ_unit),
                     self.ρ, str(self._rng_state)))

    ## Scityping serializer ##
    class Data(BaseModel):   # NB: Here we need to use the types from scityping
        t_min    : Union[float,Quantity]
        t_max    : Union[float,Quantity]
        corr_time: Union[float,Quantity]
        scale          : Union[Array,Quantity]
        impulse_density: int
        rng            : RNGenerator
        @staticmethod
        def encode(noise: "ColoredNoise"):
            return (
                noise.t_min, noise.t_max, noise.corr_time, noise.scale,
                noise.impulse_density, recreate_rng(noise._rng_state))

Validation of theoretical expressions#

Hide code cell source
import itertools
import scipy.signal as signal
import holoviews as hv
import pint
from types import SimpleNamespace
from myst_nb import glue
ureg = pint.UnitRegistry()
ureg.default_format = "~P"
hv.extension("bokeh", "matplotlib")
%matplotlib inline
from tqdm.notebook import tqdm
Hide code cell source
dims = SimpleNamespace(
    t  = hv.Dimension("t", label="time", unit="ms"),
    Δt = hv.Dimension("Δt", label="time lag", unit="ms"),
    ξ  = hv.Dimension("ξ"),
    ξ2 = hv.Dimension("ξ2", label="⟨ξ²⟩"),
    T  = hv.Dimension("T", label="realization length", unit="ms"),
    σ  = hv.Dimension("σ", label="noise strength", unit="√ms"),
    τ  = hv.Dimension("τ", label="correlation length", unit="ms"),
    ρ  = hv.Dimension("ρ", label="impulse density"),
    N  = hv.Dimension("N", label="# realizations")
)
colors = hv.Cycle("Dark2").values
Hide code cell source
experiment_iter = tqdm(exp_conds, "Exp. cond.")
for T, σ, τ, ρ, N in experiment_iter:
    if (T, σ, τ, ρ, N) in (frames_realizations.keys() & frames_autocorr.keys())  :
        continue
    
    noise = ColoredNoise(0, T, corr_time=τ, scale=σ, impulse_density=ρ, rng=seedseq)
    t_arr = np.linspace(noise.t_min, noise.t_max, int(10*T/noise.τ))

    ## Generate the realizations and compute their autocorrelation ##
    L = len(t_arr)
    Δt = np.diff(t_arr).mean()
    norm = signal.correlate(np.ones(L), np.ones(L), mode="same")  # Counts the number of sums which will contribute to each lag
    lags = signal.correlation_lags(L, L, mode="same") * Δt
    ξ_arr = np.empty((N, L))
    Cξ_arr = np.empty((N, L))
    for i, key in enumerate(tqdm(seedseq.spawn(N), "Seeds", leave=False)):
        _noise = noise.new(rng=key)
        ξ = np.fromiter((_noise(t) for t in t_arr), count=len(t_arr), dtype=float)
        ξ_arr[i] = ξ
           = signal.correlate(ξ, ξ, mode="same") / norm
        Cξ_arr[i] = 
     = Cξ_arr.mean(axis=0)

    ## Generator realization curves ##
    realization_samples = hv.Overlay([
        hv.Curve(zip(t_arr, ), kdims=dims.t, vdims=dims.ξ, label="Single realization")
        for  in ξ_arr[:n_realizations_shown]
    ])
    
    ## Generate autocorr curves ##
    autocorr_samples = hv.Overlay([
        hv.Curve(zip(lags, _Cξ), kdims=dims.Δt, vdims=dims.ξ2, label="Single realization")
        for _Cξ in Cξ_arr[:n_realizations_shown]]
    )
    avg =  hv.Curve(zip(lags, ), kdims=dims.Δt, vdims=dims.ξ2, label=f"Average ({N} realizations)")
    target = hv.Curve(zip(lags, noise.autocorr(lags)), kdims=dims.Δt, vdims=dims.ξ2, label="Theoretical")
    
    ## Compute axis range so it is appropriate for mean and target autocorr – individual realizations may be well outside this range ##
    ymax = max(avg.range("ξ2")[1], target.range("ξ2")[0])
    ymin = min(avg.range("ξ2")[0], target.range("ξ2")[0])
    Δy = ymax-ymin
    ymax += 0.05*Δy
    ymin -= 0.05*Δy
    # Round ymin down, round ymax up
    p = math.floor(np.log10(ymax-ymin)) + 2  # +2: between 10 and 100 ticks in the range
    new_range = (round(math.floor(ymin * 10**p) / 10**p, p),
                 round(math.ceil (ymax * 10**p) / 10**p, p))

    ## Assemble figures ##
    # Use random shades of grey for realization curves so we can distinguish them
    shades = np.random.uniform(.45, .8, size=n_realizations_shown)
    
    fig_autocorr = autocorr_samples * avg * target
    fig_autocorr.opts(ylim=new_range)
    fig_autocorr.opts(
        hv.opts.Curve(height=300, width=400),
        hv.opts.Curve("Curve.Single_realization", color="#DDD"),
        hv.opts.Curve("Curve.Average", color=colors[0]),
        hv.opts.Curve("Curve.Prescribed", color=colors[1], line_dash="dashed"),
        hv.opts.Overlay(title="Autocorrelation", legend_position="top"),
    )
    for curve, c in zip(fig_autocorr.Curve.Single_realization, shades):
        curve.opts(color=(c,)*3)
    
    fig_realizations = realization_samples
    fig_realizations.opts(
        hv.opts.Curve(height=300, width=400),
        #hv.opts.Curve("Curve.Single_realization", color="#DDD"),
        hv.opts.Overlay(title="Noise realizations", show_legend=False)
    )
    for curve, c in zip(fig_realizations, shades):
        curve.opts(color=(c,)*3)
    
    frames_realizations[(T, σ, τ, ρ, N)] = fig_realizations
    frames_autocorr[(T, σ, τ, ρ, N)] = fig_autocorr
No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)

Usage examples#

Scale with units

noise = ColoredNoise(t_min    = 0. *ureg.ms,
                     t_max    =10. *ureg.ms,
                     corr_time= 1. *ureg.ms,
                     scale    = 2.2*ureg.mV,
                     impulse_density=30,
                     rng=1337)
assert noise.Nbins == 10
expected_bin_edges = np.array([-5., -4., -3., -2., -1.,  0.,  1.,  2.,  3.,  4.,  5., 
                               6.,  7., 8.,  9., 10., 11., 12., 13., 14., 15.])*ureg.ms
assert np.allclose(noise.bin_edges, expected_bin_edges)
noise(1.)
0.7877495884895325 mV

Scalar output

noise = ColoredNoise(t_min    =   0. *ureg.ms,
                     t_max    =1000. *ureg.ms,
                     corr_time=   1. *ureg.ms,
                     scale    =   2.2,
                     impulse_density=30)
ξ = np.array([noise(t) for t in np.linspace(noise.t_min, noise.t_max, 1000)])
ξ.std(axis=0)
2.1664157

1d output

noise = ColoredNoise(t_min    =   0. *ureg.ms,
                     t_max    =1000. *ureg.ms,
                     corr_time=   1. *ureg.ms,
                     scale    =np.array([2.2, 1.1]),
                     impulse_density=30)
ξ = np.array([noise(t) for t in np.linspace(noise.t_min, noise.t_max, 1000)])
ξ.std(axis=0)
array([2.2753417, 1.1122088], dtype=float32)

2d output

noise = ColoredNoise(t_min    =   0. *ureg.ms,
                     t_max    =1000. *ureg.ms,
                     corr_time=   1. *ureg.ms,
                     scale    =[[2.2, 1.1],
                                [3.3, 4.4]],
                     impulse_density=30)
ξ = np.array([noise(t) for t in np.linspace(noise.t_min, noise.t_max, 1000)])
ξ.std(axis=0)
array([[2.1002111, 1.1089559],
       [3.2021656, 4.2138753]], dtype=float32)